/*
 *  Copyright 2009-2016 Weibo, Inc.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package cn.uncode.rpc.transport.netty;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import cn.uncode.rpc.common.FutureState;
import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.Future;
import cn.uncode.rpc.core.FutureListener;
import cn.uncode.rpc.core.Request;
import cn.uncode.rpc.core.Response;
import cn.uncode.rpc.core.protocol.RpcProtocolVersion;
import cn.uncode.rpc.exception.TransportException;
import cn.uncode.rpc.util.FrameworkUtil;



/**
 * netty response
 * 
 * <pre>
 * 		1） getValue() :  
 * 
 * 			if (request is timeout or request is cancel or get exception)
 * 				 throw exception; 
 * 			else 
 * 				 return value;
 * 
 * 		2） getException() :
 * 		
 * 			if (task is doing) :
 * 				 return null
 * 			if (task is done and get exception):
 * 				return exception
 * 
 * </pre>
 * 
 * @author maijunsheng
 * @version 创建时间：2013-5-31
 * 
 */
public class NettyResponseFuture implements Response, Future {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
	
	private volatile FutureState state = FutureState.DOING;

	private final CountDownLatch downLatch = new CountDownLatch(1);
	
	private Response result = null;
	
	public Response waitResponse(long timeOut, TimeUnit timeUnit) throws InterruptedException {
		downLatch.await(timeOut, timeUnit);
		return this.result;
	}

	public void release() {
		downLatch.countDown();
	}

	
	private Exception exception = null;

	private long createTime = System.currentTimeMillis();
	private int timeout = 0;
	private long processTime = 0;

	private Request request;
	private List<FutureListener> listeners;

	public NettyResponseFuture(Request requestObj, int timeout) {
		this.request = requestObj;
		this.timeout = timeout;
	}

	public void onSuccess(Response response) {
		this.result = response;
		this.processTime = response.getProcessTime();
		done();
	}

	public void onFailure(Response response) {
		this.exception = response.getException();
		this.processTime = response.getProcessTime();
		done();
	}
	


	@Override
	public Object getValue() {
		if (!isDoing()) {
			return getValueOrThrowable();
		}
		
		try {
			downLatch.await(timeout, TimeUnit.SECONDS);
		} catch (Exception e) {
			cancel(new TransportException("NettyResponseFuture getValue InterruptedException : "
					+ FrameworkUtil.toString(request) + " cost="
					+ (System.currentTimeMillis() - createTime), e));
		}



		if (isDoing()) {
			timeoutSoCancel();
		}
		
		// don't need to notifylisteners, because onSuccess or
		// onFailure or cancel method already call notifylisteners
		return getValueOrThrowable();
	}

	@Override
	public Exception getException() {
		return exception;
	}

	@Override
	public boolean cancel() {
		Exception e = new TransportException("NettyResponseFuture task cancel: serverPort="
				+ FrameworkUtil.toString(request) + " cost="
				+ (System.currentTimeMillis() - createTime));
		return cancel(e);
	}
	
	private boolean cancel(Exception e) {
		if (!isDoing()) {
			return false;
		}

		state = FutureState.CANCELLED;
		exception = e;
		release();

		notifyListeners();
		return true;
	}

	@Override
	public boolean isCancelled() {
		return state.isCancelledState();
	}

	@Override
	public boolean isDone() {
		return state.isDoneState();
	}

	@Override
	public boolean isSuccess() {
		return isDone() && (exception == null);
	}

	@Override
	public void addListener(FutureListener listener) {
		if (listener == null) {
			throw new NullPointerException("FutureListener is null");
		}

		boolean notifyNow = false;
		if (!isDoing()) {
			// is success, failure, timeout or cancel, don't add into
			// listeners, just notify
			notifyNow = true;
		} else {
			if (listeners == null) {
				listeners = new ArrayList<FutureListener>(1);
			}

			listeners.add(listener);
		}

		if (notifyNow) {
			notifyListener(listener);
		}
	}

	public long getCreateTime() {
		return createTime;
	}

	public Request getRequest() {
		return request;
	}

	public FutureState getState() {
		return state;
	}

	private void timeoutSoCancel() {
		this.processTime = System.currentTimeMillis() - createTime;

		if (!isDoing()) {
			return;
		}
		
		state = FutureState.CANCELLED;
		exception = new TransportException("NettyResponseFuture request timeout: serverPort="
				+ FrameworkUtil.toString(request) + " cost="
				+ (System.currentTimeMillis() - createTime));
		
		release();

		notifyListeners();
	}

	private void notifyListeners() {
		if (listeners != null) {
			for (FutureListener listener : listeners) {
				notifyListener(listener);
			}
		}
	}

	private void notifyListener(FutureListener listener) {
		try {
			listener.operationComplete(this);
		} catch (Throwable t) {
			LOGGER.error("NettyResponseFuture notifyListener Error: " + listener.getClass().getSimpleName(), t);
		}
	}

	private boolean isDoing() {
		return state.isDoingState();
	}

	private boolean done() {
		if (!isDoing()) {
			return false;
		}

		state = FutureState.DONE;
		release();

		notifyListeners();
		return true;
	}

	public long getRequestId() {
		return this.request.getRequestId();
	}

	private Object getValueOrThrowable() {
		if (exception != null) {
			throw (exception instanceof RuntimeException) ? (RuntimeException) exception : new TransportException(
					exception.getMessage(), exception);
		}

		return result;
	}

	@Override
	public long getProcessTime() {
		return processTime;
	}

	@Override
	public void setProcessTime(long time) {
		this.processTime = time;
	}
	
	public int getTimeout() {
	    return timeout;
	}

    @Override
    public Map<String, String> getAttachments() {
        // 不需要使用
        return Collections.EMPTY_MAP;
    }

    @Override
    public void setAttachment(String key, String value) {}

    @Override
    public void setRpcProtocolVersion(byte rpcProtocolVersion) {}

    @Override
    public byte getRpcProtocolVersion() {
        return RpcProtocolVersion.VERSION_1.getVersion();
    }
}
